package SparkMllibDemo

import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by Weipengfei on 2017/5/3 0003.
  * ALS过滤算法
  */
object Recommend {
  def main(args: Array[String]) {
    println("====================数据准备阶段=====================")
    SetLogger
    val (ratings,movieTitle)=PrepareData()
    println("====================训练阶段=====================")
    print("开始使用"+ratings.count().toString+"条评比数据进行训练模型.....")
    val model=ALS.train(ratings ,10,5,0.01)
    println("训练完成！")
    println("====================推荐阶段=====================")
    recommend(model,movieTitle)
  }

  /**
    * 针对用户推荐电影
    *
    * @param model 训练好的ALS模型
    * @param movieTitle 电影id与名字的对应表
    * @param inputUserId 用户id
    */
  def RecommendMovies(model: MatrixFactorizationModel, movieTitle:Map[Int,String], inputUserId: Int): Unit ={
    val recommendMovie=model.recommendProducts(inputUserId,10)//获取针对inputUserId推荐前十部电影
    println("针对用户id："+inputUserId+" 推荐下列电影：")
    var i=1
    recommendMovie.foreach{
      movie=>println(i.toString+"."+movieTitle(movie.product.toInt)+" 评分："+movie.rating.toString)
        i+=1
    }
  }

  /**
    * 针对电影推荐给用户
    *
    * @param model 训练好的ALS模型
    * @param movieTitle 电影id与名字的对应表
    * @param inputMovieId 电影id
    */
  def RecommendUsers(model: MatrixFactorizationModel, movieTitle: Map[Int, String], inputMovieId: Int): Unit ={
    val recommendUser=model.recommendUsers(inputMovieId,10)//获取针对inputMovieId推荐前十名用户
    println("针对电影id："+inputMovieId+" 推荐下列用户id：")
    var i=1
    recommendUser.foreach{
      u=>println(i.toString+".用户id："+u.user+" 评分："+u.rating)
        i+=1
    }
  }

  /**
    * 实行具体推荐
    *
    * @param model 训练好的推荐模型
    * @param movieTitle 电影id与名字的对应表
    */
  def recommend(model:MatrixFactorizationModel, movieTitle:Map[Int,String]): Unit ={
    var choose=""
    while(choose != "3"){
      print("请选择要推荐类型 1.针对用户推荐电影 2.针对电影推荐给感兴趣的用户 3.离开？")
      choose=readLine()
      if(choose=="1"){
        print("请输入用户id？")
        val inputUserId=readLine()
        RecommendMovies(model,movieTitle,inputUserId.toInt)
      }else if(choose=="2"){
        print("请输入电影的id？")
        val inputMovieId=readLine()
        RecommendUsers(model,movieTitle,inputMovieId.toInt)
      }
    }
  }

  /**
    * 设置日志及乱七八糟的配置
    */
  def SetLogger: Unit ={
    System.setProperty("hadoop.home.dir", "E://hadoop-liyadong//hadoop-2.7.1")
    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("com").setLevel(Level.OFF)
    System.setProperty("spark.ui.showConsoleProgress","false")
    Logger.getRootLogger.setLevel(Level.OFF)
  }

  /**
    * 数据准备
    *
    * @return （RDD[Rating[Int] (用户id，电影id，评分)，电影id与名称的对照表）
    */
  def PrepareData():(RDD[Rating],Map[Int,String])={
    val sc=new SparkContext(new SparkConf().setAppName("Recommend").setMaster("local[2]").set("spark.testing.memory","21474800000"))
    //创建用户评分数据
    print("开始读取用户评分数据中...")
    val rawUserData=sc.textFile("D://u.data")
    val rawRatings=rawUserData.map(_.split("\t").take(3))
    val ratingsRDD=rawRatings.map{
    case Array(user,movie,rating) => Rating( user.toInt ,movie.toInt,rating.toFloat)
  }
    println("共计："+ratingsRDD.count().toString+"条评分")
    //创建电影ID和名称对应表
    print("开始读取电影数据中...")
    val itemRDD=sc.textFile("D://u.item")
    val moiveTitle=itemRDD.map(_.split("\\|").take(2)).map(array=>(array(0).toInt,array(1))).collect().toMap
    //显示数据记录数
    val numRatings=ratingsRDD.count()
    val numUser=ratingsRDD.map(_.user).distinct().count()
    val numMoive=ratingsRDD.map(_.product).distinct().count()
    println("共计：评分"+numRatings+"条 用户"+numUser+"个 电影"+numMoive+"个")
    (ratingsRDD,moiveTitle)
  }
}
